package com.atguigu.flink.state.keyedstate;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/17

 示例: 计算每种传感器的平均水位
 */
public class Demo5_AggregateState
{
    public static void main(String[] args) {
        
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);


        /*EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        env.setStateBackend(rocksDBStateBackend);*/

        env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                   //按照id分组
                    .keyBy(WaterSensor::getId)
                    .process(new KeyedProcessFunction<String, WaterSensor, String>()
                    {

                        private AggregatingState<Integer, Double> avgVc;

                        //在Task被创建后，从备份中恢复
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            avgVc = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("avgVc",
                                new AggregateFunction<Integer, Tuple2<Double, Integer>, Double>()
                                {
                                    @Override
                                    public Tuple2<Double, Integer> createAccumulator() {
                                        return Tuple2.of(0d, 0);
                                    }

                                    @Override
                                    public Tuple2<Double, Integer> add(Integer value, Tuple2<Double, Integer> accumulator) {
                                        accumulator.f0 = accumulator.f0 + value;
                                        accumulator.f1 = accumulator.f1 + 1;
                                        return accumulator;
                                    }

                                    @Override
                                    public Double getResult(Tuple2<Double, Integer> accumulator) {
                                        return accumulator.f0 / accumulator.f1;
                                    }

                                    //无需实现
                                    @Override
                                    public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
                                        return null;
                                    }
                                }, Types.TUPLE(Types.DOUBLE, Types.INT)));
                        }

                        @Override
                        public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {

                            //加入到状态中，就可以自动聚合
                            avgVc.add(value.getVc());

                            out.collect(ctx.getCurrentKey() +" avgVc:"+avgVc.get());


                        }
                    })
                    .print();
        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
